﻿using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Naruto.Subscribe.Extension;
using Naruto.Subscribe.Interface;
using Naruto.Subscribe.Object;
using Naruto.Subscribe.Provider.Kafka.Extension;
using Naruto.Subscribe.Provider.Kafka.Interface;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace Naruto.Subscribe.Provider.Kafka.Internal
{
    /// <summary>
    /// 消息发布提供者
    /// </summary>
    public class KafkaPublish : INarutoPublish
    {
        /// <summary>
        /// 生产者连接池
        /// </summary>
        private readonly IProducerConnectionPool _producerConnection;
        /// <summary>
        /// 日志
        /// </summary>
        private readonly ILogger _logger;
        /// <summary>
        /// 
        /// </summary>
        /// <param name="producerConnection"></param>
        /// <param name="_logger"></param>
        public KafkaPublish(IProducerConnectionPool producerConnection, ILogger<KafkaPublish> logger)
        {
            _producerConnection = producerConnection;
            _logger = logger;
        }
        /// <summary>
        /// 
        /// </summary>
        /// <param name="subscribeName"></param>
        /// <param name="msg"></param>
        /// <param name="headers"></param>
        public void Publish(string subscribeName, object msg = null, IDictionary<string, string> headers = null)
        {
            _logger.LogTrace("开始发送同步消息,subscribeName={subscribeName},msg={msg},headers={headers}", subscribeName, msg.ToJsonString(), headers.ToJsonString());
            var producer = GetProducer();
            try
            {
                //转换消息
                var messageModel = new MessageModel()
                {
                    Id = Guid.NewGuid().ToString(),
                    MessageContent = msg?.ToJsonString(),
                    SubscribeName = subscribeName
                };
                producer.Produce(subscribeName, new Message<string, byte[]>
                {
                    Headers = headers.ToHeaders(),
                    Value = messageModel.ToByte(),
                });
            }
            catch (Exception ex)
            {
                _logger.LogError("开始发送同步消息,消息发送失败,入参信息subscribeName={subscribeName},msg={msg},headers={headers},error={error}", subscribeName, msg.ToJsonString(), headers.ToJsonString(), ex.Message);
                throw;
            }
            finally
            {
                var res = _producerConnection.Return(producer);
                _logger.LogTrace("开始发送同步消息,正在归还生产者,subscribeName={subscribeName},msg={msg},headers={headers},isReturnSuccess={isReturnSuccess}", subscribeName, msg.ToJsonString(), headers.ToJsonString(), res);
            }
        }


        /// <summary>
        /// 
        /// </summary>
        /// <param name="subscribeName"></param>
        /// <param name="msg"></param>
        /// <param name="headers"></param>
        /// <returns></returns>
        public async Task PublishAsync(string subscribeName, object msg = null, IDictionary<string, string> headers = null)
        {
            _logger.LogTrace("开始发送异步消息,subscribeName={subscribeName},msg={msg},headers={headers}", subscribeName, msg.ToJsonString(), headers.ToJsonString());
            var producer = GetProducer();
            try
            {
                //转换消息
                var messageModel = new MessageModel()
                {
                    Id = Guid.NewGuid().ToString(),
                    MessageContent = msg?.ToJsonString(),
                    SubscribeName = subscribeName
                };
                var result = await producer.ProduceAsync(subscribeName, new Message<string, byte[]>
                {
                    Headers = headers.ToHeaders(),
                    Value = messageModel.ToByte(),
                });
                if (result.Status == PersistenceStatus.Persisted || result.Status == PersistenceStatus.PossiblyPersisted)
                {
                    _logger.LogTrace("开始发送异步消息,消息发送成功,subscribeName={subscribeName}", subscribeName);
                }
                else
                {
                    _logger.LogTrace("开始发送异步消息,消息发送失败,subscribeName={subscribeName},msg={msg},headers={headers},result={result}", subscribeName, msg.ToJsonString(), headers.ToJsonString(), result);
                    throw new InvalidOperationException($"消息发送失败，subscribeName={subscribeName}");
                }
            }
            catch (Exception ex)
            {
                _logger.LogError("开始发送异步消息,消息发送失败,入参信息subscribeName={subscribeName},msg={msg},headers={headers},error={error}", subscribeName, msg.ToJsonString(), headers.ToJsonString(), ex.Message);
                throw;
            }
            finally
            {
                var res = _producerConnection.Return(producer);
                _logger.LogTrace("开始发送异步消息,正在归还生产者,subscribeName={subscribeName},msg={msg},headers={headers},isReturnSuccess={isReturnSuccess}", subscribeName, msg.ToJsonString(), headers.ToJsonString(), res);
            }
        }

        /// <summary>
        /// 获取生产者
        /// </summary>
        protected virtual IProducer<string, byte[]> GetProducer()
        {
            return _producerConnection.Rent();
        }
    }
}
